package com.gbase8c.dmt.service;

import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.Engine;
import com.alibaba.datax.core.util.ConfigParser;
import com.alibaba.datax.core.util.container.CoreConstant;
import com.gbase8c.dmt.config.DmtConfig;
import com.gbase8c.dmt.dao.*;
import com.gbase8c.dmt.dao.file.*;
import com.gbase8c.dmt.migration.MigrationExecutionService;
import com.gbase8c.dmt.migration.MigrationObjectService;
import com.gbase8c.dmt.model.migration.config.*;
import com.gbase8c.dmt.db.metadata.Metadata;
import com.gbase8c.dmt.db.metadata.MetadataFactory;
import com.gbase8c.dmt.model.enums.MigrationObjectType;
import com.gbase8c.dmt.model.migration.dto.*;
import com.gbase8c.dmt.model.migration.record.MigrationRecordDto;
import com.gbase8c.dmt.model.enums.MigrationTaskStatus;

import com.google.common.collect.Lists;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;

import javax.sql.DataSource;
import java.io.File;
import java.sql.SQLException;
import java.util.*;
import java.util.stream.Collectors;

@Slf4j
public class TaskService {

    private DmtConfig dmtConfig;
    private TaskDao taskDao;
    private DboDao dboDao;
    private DataSourceDao dataSourceDao;
    private RecordDao recordDao;
    private SnapshotDao snapshotDao;

    MigrationExecutionService migrationExecutionService;

    public TaskService(DmtConfig dmtConfig) {
        this.dmtConfig = dmtConfig;
        this.taskDao = new TaskDaoImpl(dmtConfig);
        this.dboDao = new DboDaoImpl(dmtConfig);
        this.dataSourceDao = new DataSourceDaoImpl(dmtConfig);
        this.recordDao = new RecordDaoImpl(dmtConfig);
        this.snapshotDao = new SnapshotDaoImpl(dmtConfig);
        this.dboDao = new DboDaoImpl(dmtConfig);
        this.migrationExecutionService = new MigrationExecutionService(dmtConfig,
                dataSourceDao, snapshotDao,
                dboDao, taskDao, recordDao);
    }

    public Snapshot createSnapshot(String dsId, String note) {
        DataSourceDto dataSourceDto = dataSourceDao.get(dsId);
        Metadata metadata = MetadataFactory.getMetaData(dataSourceDto);
        Snapshot snapshot = metadata.snapshot();
        snapshot.setDbType(dataSourceDto.getDbType());
        snapshot.setStatus(Snapshot.Status.valid);
        snapshot.setNote(note);
        snapshotDao.save(snapshot);
        return snapshot;
    }

    @Getter
    @Setter
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    private static class MigrationTask {
        private Action action;
        private String taskId;
        private Task task;
        private DboDto dboDto;
        private Date startTime;
        private Date endTime;
    }

    private enum Action {
        precheck,
        start;
    }

    private String fileDir() {
        String workdir = dmtConfig.getWorkDir();
        String id = dmtConfig.getTaskId();
        return StringUtils.joinWith(File.separator, workdir, id, "configuration");
    }

    private String file(String jsonName) {
        String workdir = dmtConfig.getWorkDir();
        String id = dmtConfig.getTaskId();
        jsonName = StringUtils.trim(jsonName);
        return StringUtils.joinWith(File.separator, workdir, id, "configuration", jsonName);
    }

    public void resumeBt(String tables) {
        List<Configuration> configurations = Lists.newArrayList();
        String[] ts = null;
        if (StringUtils.isNotBlank(tables)) {
            ts = tables.split(",");
            for (int i=0; i< ts.length; i++) {
                ts[i] = ts[i] + ".json";
            }
        } else {
            File dir = new File(fileDir());
            ts = dir.list();
        }
        if (ts != null) {
            configurations = Arrays.stream(ts).map(this::file)
                    .map(ConfigParser::parse)
                    .peek(conf -> conf.set("core.container.model", "taskGroup"))
                    .collect(Collectors.toList());
        }

        Flowable.fromIterable(configurations)
                .observeOn(Schedulers.io())
                .parallel()
                .runOn(Schedulers.io())
                .doOnNext(this::clean)
                .doOnNext(this::start)
                .sequential()
                .collect(Collectors.toList())
                .blockingSubscribe(
                        list -> {
                            log.info("断点续传成功");
                        },
                        error -> {
                            log.error("断点续传出错", error);
                        }
                );

    }

    public void clean(Configuration configuration) throws SQLException {
        List<Configuration> taskConfigs = configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);
        Task task = taskDao.get(dmtConfig.getTaskId());
        DataSourceDto tarDataSourceDto = dataSourceDao.get(task.getTar());
        DataSource dataSource = DataSourceFactory.getDataSource(tarDataSourceDto);
        for (Configuration taskConfig : taskConfigs) {
            String state = taskConfig.getString("state");
            if ("FAILED".equalsIgnoreCase(state) || "RUNNING".equalsIgnoreCase(state)) {
                String tarTable = taskConfig.getString("writer.parameter.table");
                String cleanSql;
                String splitPk = taskConfig.getString("reader.parameter.splitPk");
                if (StringUtils.isNotBlank(splitPk)) {
                    String querySql = taskConfig.getString("reader.parameter.querySql");
                    List<String> columnList = taskConfig.getList("reader.parameter.columnList", String.class);
                    List<String> tarColumnList = taskConfig.getList("writer.parameter.column", String.class);
                    String whereSql;
                    String whereKey = " where ";
                    int index = querySql.toLowerCase().indexOf(whereKey);
                    whereSql = querySql.substring(index);

                    String tarColumn = tarColumnList.get(columnList.indexOf(splitPk));
                    whereSql = whereSql.replaceAll(splitPk, tarColumn);

                    cleanSql = "delete from " + tarTable + whereSql;
                } else {
                    // truncate table xxx
                    cleanSql = "truncate table " + tarTable;
                }
                // 执行cleanSql
                log.info("开始表 {} 分块清理, sql为: {}", tarTable, cleanSql);
                QueryRunner queryRunner = new QueryRunner(dataSource);
                queryRunner.execute(cleanSql);
                log.info("完成表 {} 分块清理, sql为: {}", tarTable, cleanSql);
            }
        }
    }

    public void start(Configuration configuration) {
        Engine engine = new Engine();
        engine.start(configuration);
    }

    public void start() {
        String id = dmtConfig.getTaskId();
        Task task = taskDao.get(id);
        task.setId(id);
        MigrationTask mt = MigrationTask.builder()
                .action(Action.start)
                .taskId(task.getId())
                .task(task)
                .build();
        Flowable.fromIterable(Lists.newArrayList(mt))
                .observeOn(Schedulers.io())
                .parallel()
                .runOn(Schedulers.io())
                .doOnNext(this::collect)
                .doOnNext(this::convert)
                .doOnNext(migrationTask -> migrationExecutionService.runMigrationExecution(migrationTask.getDboDto()))
                .sequential()
                .collect(Collectors.toList())
                .blockingSubscribe(
                        list -> {
                            List<MigrationRecordDto> rs = recordDao.list(id);
                            MigrationRecordDto recordDto = rs.stream().max(Comparator.comparing(MigrationRecordDto::getEndTime)).get();
                            task.setStatus(recordDto.getMigrationTaskStatus());
                            MigrationObjectService.updateTaskStatus(mt.getTaskId(), recordDto.getMigrationTaskStatus());
                            MigrationObjectService.removeTaskStatus(mt.getTaskId());
                            taskDao.update(task);
                        },
                        error -> {
                            log.error("migrate task error", error);
                            task.setStatus(MigrationTaskStatus.FAILED);
                            MigrationObjectService.updateTaskStatus(mt.getTaskId(),MigrationTaskStatus.FAILED);
                            MigrationObjectService.removeTaskStatus(mt.getTaskId());
                            taskDao.update(task);
                        }
                );
    }

    private void collect(MigrationTask mt) {
        Task task = mt.getTask();

        MigrationTaskStatus status;
        if (Action.precheck == mt.action) {
            status = MigrationTaskStatus.PRECHECKING;
        } else {
            status = MigrationTaskStatus.COLLECTING;
        }
        task.setStatus(status);
        taskDao.update(task);
        MigrationObjectService.updateTaskStatus(mt.getTaskId(), status);

        DataSourceDto srcDataSourceDto = dataSourceDao.get(task.getSrc());
        Metadata metadata = MetadataFactory.getMetaData(srcDataSourceDto);

        //获取选择迁移的表空间名字,在获取表对象时用来判断是否开启迁入对应表空间.
        List<SchemaDto> schemaDtos = Lists.newArrayList();
        SchemaConfig schemaConfig = task.getSchemaConfig();
        MigrateConfig migrateConfig = task.getMigrateConfig();
        boolean migrateConstraint = migrateConfig.getMigratePk()
                || migrateConfig.getMigrateFk()
                || migrateConfig.getMigrateCheckConstraint()
                || migrateConfig.getMigrateUniqueConstraint();

        boolean migrateIndex = migrateConfig.getMigrateIndex();

        List<TableSpaceDto> tableSpaceDtos = Lists.newArrayList();
        TableSpaceConfig tableSpaceConfig = task.getTableSpaceConfig();
        if (ObjectUtils.isNotEmpty(tableSpaceConfig)) {
            List<TableSpaceConfig.TableSpaceMapper> tableSpaceMappers = tableSpaceConfig.getTableSpaceMappers();
            if (CollectionUtils.isNotEmpty(tableSpaceMappers)) {
                for (TableSpaceConfig.TableSpaceMapper tableSpaceMapper : tableSpaceMappers) {
                    TableSpaceDto tableSpaceDto = TableSpaceDto.builder()
                            .name(tableSpaceMapper.getTableSpaceName())
                            .srcPath(tableSpaceMapper.getSrcPath())
                            .schema("other")
                            .tarPath(tableSpaceMapper.getTarPath())
                            .task(task)
                            .migrationObjectType(MigrationObjectType.TABLESPACE)
                            .build();
                    if (!StringUtils.isBlank(tableSpaceMapper.getUseName())) {
                        tableSpaceDto.setUseName(tableSpaceMapper.getUseName());
                    }
                    if (!StringUtils.isBlank(tableSpaceMapper.getRelative())) {
                        tableSpaceDto.setRelative(tableSpaceMapper.getRelative());
                    }
                    if (!StringUtils.isBlank(tableSpaceMapper.getSpcMaxSize())) {
                        tableSpaceDto.setSpcMaxSize(tableSpaceMapper.getSpcMaxSize());
                    }
                    if (!StringUtils.isBlank(tableSpaceMapper.getSpcOptions())) {
                        tableSpaceDto.setSpcOptions(tableSpaceMapper.getSpcOptions());
                    }
                    tableSpaceDtos.add(tableSpaceDto);
                }
            }
        }

        List<String> tableSpaceNames = tableSpaceDtos.stream()
                .map(TableSpaceDto::getName).collect(Collectors.toList());

        if (ObjectUtils.isNotEmpty(schemaConfig)) {
            List<SchemaConfig.SchemaMapper> schemaMappers = schemaConfig.getSchemaMappers();
            if (CollectionUtils.isNotEmpty(schemaMappers)) {
                for (SchemaConfig.SchemaMapper schemaMapper : schemaMappers) {
                    SchemaDto schemaDto = SchemaDto.builder()
                            .migrationObjectType(MigrationObjectType.SCHEMA)
                            .name(schemaMapper.getSrcSchemaName())
                            .schema(schemaMapper.getSrcSchemaName())
                            .tarSchema(schemaMapper.getTarSchemaName())
                            .tarName(schemaMapper.getTarSchemaName())
                            .task(task)
                            .build();

                    // table
                    if (schemaMapper.getTableMigrated()) {
                        List<TableDto> tableDtos = tableDtos(task, schemaMapper);
                        schemaDto.setTableDtos(tableDtos);
                    }

                    // view
                    if (schemaMapper.getViewMigrated()) {
                        List<ViewDto> viewDtos = viewDtos(task, schemaMapper);
                        schemaDto.setViewDtos(viewDtos);
                    }

                    // sequence
                    if (schemaMapper.getSequenceMigrated()) {
                        List<SequenceDto> sequenceDtos = sequenceDtos(task, schemaMapper);
                        schemaDto.setSequenceDtos(sequenceDtos);
                    }

                    // function
                    if (schemaMapper.getFunctionMigrated()) {
                        List<FunctionDto> functionDtos = functinDtos(task, schemaMapper);
                        schemaDto.setFunctionDtos(functionDtos);
                    }

                    // synonym
                    if (schemaMapper.getSynonymMigrated()) {
                        List<SynonymDto> synonymDtos = synonymDtos(task, schemaMapper);
                        schemaDto.setSynonymDtos(synonymDtos);
                    }


                    //todo:  add before release trigger
                    if (schemaMapper.getTriggerMigrated()) {
                        List<TriggerDto> triggerDtos = triggerDtos(task, schemaMapper);
                        schemaDto.setTriggerDtos(triggerDtos);
                    }

                    // constraint
                    if (migrateConstraint) {
                        List<ConstraintDto> cs = Lists.newArrayList();
                        List<TableDto> tableDtos = schemaDto.getTableDtos();
                        for (TableDto tableDto : tableDtos) {
                            List<ConstraintDto> tableConstraints = metadata.getConstraintDtos(schemaDto.getName(), tableDto.getName());
                            if (CollectionUtils.isNotEmpty(tableConstraints)) {
                                cs.addAll(tableConstraints);
                            }
                        }
                        if (CollectionUtils.isNotEmpty(cs)) {
                            List<ConstraintDto> constraintDtos = cs.stream()
                                    .map(dto -> metadata.getConstraintDto(schemaMapper.getSrcSchemaName(), dto))
                                    .peek(dto -> {
                                        dto.setTarSchema(schemaMapper.getTarSchemaName());
                                        dto.setTask(task);
                                        dto.setMigrationObjectType(MigrationObjectType.valueOf(dto.getConsType()));
                                    })
                                    .collect(Collectors.toList());
                            schemaDto.setConstraintDtos(constraintDtos);
                        }
                    }

                    // index
                    if (migrateIndex) {
                        List<IndexDto> is = Lists.newArrayList();
                        List<TableDto> tableDtos = schemaDto.getTableDtos();
                        for (TableDto tableDto : tableDtos) {
                            List<IndexDto> tableIndexes = metadata.getIndexDtos(schemaDto.getName(), tableDto.getName());
                            if (CollectionUtils.isNotEmpty(tableIndexes)) {
                                is.addAll(tableIndexes);
                            }
                        }
                        if (CollectionUtils.isNotEmpty(is)){
                            List<IndexDto> indexDtos = is.stream()
                                    .map(dto -> metadata.getIndexDto(dto.getSchema(), dto, tableSpaceNames))
                                    .peek(dto -> {
                                        dto.setTarSchema(schemaMapper.getTarSchemaName());
                                        dto.setTask(task);
                                        dto.setMigrationObjectType(MigrationObjectType.INDEX);
                                    })
                                    .collect(Collectors.toList());
                            schemaDto.setIndexDtos(indexDtos);
                        }
                    }
                    schemaDtos.add(schemaDto);
                }
            }
        }

        DboDto dbo = DboDto.builder()
                .task(task)
                .schemaDtos(schemaDtos)
                .tableSpaceDtos(tableSpaceDtos)
                .build();
        mt.setDboDto(dbo);
    }

    private void convert(MigrationTask mt) {
        Task task = mt.getTask();
        MigrateConfig migrateConfig = task.getMigrateConfig();
        Boolean migrateData = migrateConfig.getMigrateData();

        DboDto dboDto = mt.getDboDto();

        MigrationTaskStatus status = MigrationTaskStatus.CONVERTING;

        task.setStatus(status);
        taskDao.update(task);
        MigrationObjectService.updateTaskStatus(mt.getTaskId(), status);

        DataSourceDto src = dataSourceDao.get(dboDto.getTask().getSrc());
        DataSourceDto tar = dataSourceDao.get(dboDto.getTask().getTar());
        Metadata tarMetadata = MetadataFactory.getMetaData(tar);
        List<TriggerDto> tableScaleLessThan0Triggers = Lists.newArrayList();

        // schema
        dboDto.getSchemaDtos()
                .forEach(schemaDto -> {
                    tarMetadata.schemaConvert(schemaDto);
                    String sql = tarMetadata.schemaSql(schemaDto);
                    schemaDto.setTarSql(sql);
                    schemaDto.setTask(null);
                    schemaDto.setContents(Lists.newArrayList(sql));
                });

        // sequence
        dboDto.getSchemaDtos().stream()
                .flatMap(schemaDto -> schemaDto.getSequenceDtos().stream())
                .forEach(sequenceDto -> {
                    tarMetadata.sequenceConvert(sequenceDto);
                    String sql = tarMetadata.sequenceSql(sequenceDto);
                    sequenceDto.setTarSql(sql);
                    sequenceDto.setTask(null);
                    sequenceDto.setContents(Lists.newArrayList(sql));
                });

        // tableSpace
        dboDto.getTableSpaceDtos()
                .forEach(tableSpaceDto -> {
                    tarMetadata.tableSpaceConvert(tableSpaceDto);
                    String sql = tarMetadata.tableSpaceSql(tableSpaceDto);
                    tableSpaceDto.setTarSql(sql);
                    tableSpaceDto.setTask(null);
                    tableSpaceDto.setContents(Lists.newArrayList(sql));
                });

        // table
        dboDto.getSchemaDtos().stream()
                .flatMap(schemaDto -> schemaDto.getTableDtos().stream())
                .forEach(tableDto -> {
                    tarMetadata.tableConvert(tableDto);
                    List<TriggerDto> triggerDtos = tableDto.getTriggerDtos();
                    if (CollectionUtils.isNotEmpty(triggerDtos)){
                        tableScaleLessThan0Triggers.addAll(triggerDtos);
                    }
                    List<String> sqls = tarMetadata.tableSql(tableDto);
                    tableDto.setTarSqls(sqls);
                    tableDto.setTask(null);
                    tableDto.setContents(sqls);
                    tableDto.setTableMapper(tableDto.toMapper());
                });

        // index
        dboDto.getSchemaDtos().stream()
                .flatMap(schemaDto -> schemaDto.getIndexDtos().stream())
                .forEach(indexDto -> {
                    tarMetadata.indexConvert(indexDto);
                    String sql = tarMetadata.indexSql(indexDto);
                    indexDto.setTarSql(sql);
                    indexDto.setTask(null);
                    indexDto.setContents(Lists.newArrayList(sql));
                });

        List<String> tableNameList = Lists.newArrayList();
        dboDto.getSchemaDtos().forEach(
                schemaDto -> {
                    List<String> collect = schemaDto.getTableDtos().stream().map(TableDto::getName).collect(Collectors.toList());
                    tableNameList.addAll(collect);
                }
        );
        // constraint
        dboDto.getSchemaDtos().stream()
                .flatMap(schemaDto -> schemaDto.getConstraintDtos().stream())
                .forEach(constraintDto -> {
                    tarMetadata.constraintConvert(constraintDto,tableNameList);
                    String sql = tarMetadata.constraintSql(constraintDto);
                    constraintDto.setTarSql(sql);
                    constraintDto.setTask(null);
                    constraintDto.setContents(Lists.newArrayList(sql));
                });

        // view
        dboDto.getSchemaDtos().stream()
                .flatMap(schemaDto -> schemaDto.getViewDtos().stream())
                .forEach(viewDto -> {
                    tarMetadata.viewConvert(viewDto);
                    String sql = tarMetadata.viewSql(viewDto);
                    viewDto.setTask(null);
                    viewDto.setTarSql(sql);
                    viewDto.setContents(Lists.newArrayList(sql));
                });

        // function
        dboDto.getSchemaDtos().stream()
                .flatMap(schemaDto -> schemaDto.getFunctionDtos().stream())
                .forEach(functionDto -> {
                    tarMetadata.functionConvert(functionDto, src, tar);
                    String sql = tarMetadata.functionSql(functionDto);
                    functionDto.setTask(null);
                    functionDto.setTarSql(sql);
                    functionDto.setContents(Lists.newArrayList(sql));
                    functionDto.setConvertible(true);
                });

        // trigger
//        dboDto.getSchemaDtos().forEach(schemaDto -> {
//            tableScaleLessThan0Triggers.forEach(triggerDto -> {
//                if (triggerDto.getTarSchema().equals(schemaDto.getTarSchema())) {
//                    schemaDto.getTriggerDtos().add(triggerDto);
//                }
//            });
//        });
        for (SchemaDto schemaDto : dboDto.getSchemaDtos()) {
            for (TriggerDto triggerDto : tableScaleLessThan0Triggers) {
                if (triggerDto.getTarSchema().equalsIgnoreCase(schemaDto.getTarName())) {
                    List<TriggerDto> triggerDtos = schemaDto.getTriggerDtos();
                    triggerDtos.add(triggerDto);
                    schemaDto.setTriggerDtos(triggerDtos);
                }
            }
        }
        dboDto.getSchemaDtos().stream()
                .flatMap(schemaDto -> schemaDto.getTriggerDtos().stream())
                .forEach(triggerDto -> {
                    tarMetadata.triggerConvert(triggerDto);
                    List<String> sql = tarMetadata.triggerSql(triggerDto);
                    triggerDto.setTask(null);
                    triggerDto.setTarSql(sql);
                    triggerDto.setMigrationObjectType(MigrationObjectType.TRIGGER);
                    triggerDto.setContents(Lists.newArrayList(sql));
                });

        // synonym
        dboDto.getSchemaDtos().stream()
                .flatMap(schemaDto -> schemaDto.getSynonymDtos().stream())
                .forEach(synonymDto -> {
                    tarMetadata.synonymConvert(synonymDto);
                    String sql = tarMetadata.synonynSql(synonymDto);
                    synonymDto.setTask(null);
                    synonymDto.setTarSql(sql);
                    synonymDto.setContents(Lists.newArrayList(sql));
                });

        dboDto.setId(task.getId());
    }

    private List<String> tableNames(Task task, SchemaConfig.SchemaMapper schemaMapper) {
        List<String> tableNames = null;
        TableConfig tableConfig = schemaMapper.getTableConfig();
        if (ObjectUtils.isNotEmpty(tableConfig)) {
            String srcSchemaName = schemaMapper.getSrcSchemaName();
            DataSourceDto dataSourceDto = dataSourceDao.get(task.getSrc());
            Metadata metadata = MetadataFactory.getMetaData(dataSourceDto);
            TableConfig.ScopeType st = tableConfig.getScopeType();
            switch (st) {
                case full:
                    tableNames = metadata.getTableNames(srcSchemaName);
                    break;
                case include:
                    tableNames = tableConfig.getValues();
                    break;
                case exclude:
                    tableNames = metadata.getTableNames(srcSchemaName);
                    tableNames.removeAll(tableConfig.getValues());
                    break;
            }
        }
        return CollectionUtils.isNotEmpty(tableNames) ? tableNames : Lists.newArrayList();
    }

    private List<TableDto> tableDtos(Task task, SchemaConfig.SchemaMapper schemaMapper) {
        List<TableDto> tableDtos = Lists.newArrayList();
        List<String> tableNames = tableNames(task, schemaMapper);
        if (ObjectUtils.isNotEmpty(tableNames)) {
            DataSourceDto dataSourceDto = dataSourceDao.get(task.getSrc());
            Metadata metadata = MetadataFactory.getMetaData(dataSourceDto);
            String srcSchemaName = schemaMapper.getSrcSchemaName();
            String tarSchemaName = schemaMapper.getTarSchemaName();
            List<String> tableSpaceNames = metadata.getTableSpaceNames();
            if (CollectionUtils.isNotEmpty(tableNames)) {
                for (String tableName : tableNames) {
                    TableDto tableDto = metadata.getTableDto(srcSchemaName, tableName, tableSpaceNames);
                    tableDto.setTask(task);
                    tableDto.setTaskId(task.getId());
                    tableDto.setMigrationObjectType(MigrationObjectType.TABLE);
                    tableDto.setTarSchema(tarSchemaName);
                    tableDtos.add(tableDto);
                }
            }
        }
        return tableDtos;
    }

    private List<SynonymDto> synonymDtos(Task task, SchemaConfig.SchemaMapper schemaMapper) {
        String schemaName = schemaMapper.getSrcSchemaName();
        String tarSchemaName = schemaMapper.getTarSchemaName();
        List<SynonymDto> synonymDtos = Lists.newArrayList();
        DataSourceDto dataSourceDto = dataSourceDao.get(task.getSrc());
        Metadata metadata = MetadataFactory.getMetaData(dataSourceDto);
        List<SynonymDto> synonymDtoList = metadata.getSynonymDtos(schemaName);
        if (CollectionUtils.isNotEmpty(synonymDtoList)) {
            synonymDtos = synonymDtoList.stream()
                    .map(synonymDto -> metadata.getSynonymDto(synonymDto.getSchema(), synonymDto.getName()))
                    .peek(synonymDto -> {
                        synonymDto.setTarSchema(tarSchemaName);
                        synonymDto.setTask(task);
                        synonymDto.setMigrationObjectType(MigrationObjectType.SYNONYM);
                    })
                    .collect(Collectors.toList());
        }
        return synonymDtos;
    }

    private List<SequenceDto> sequenceDtos(Task task, SchemaConfig.SchemaMapper schemaMapper) {
        String schemaName = schemaMapper.getSrcSchemaName();
        String tarSchemaName = schemaMapper.getTarSchemaName();
        List<SequenceDto> sequenceDtos = Lists.newArrayList();
        DataSourceDto dataSourceDto = dataSourceDao.get(task.getSrc());
        Metadata metadata = MetadataFactory.getMetaData(dataSourceDto);
        List<String> sequenceNames = metadata.getSequencesNames(schemaName);
        if (CollectionUtils.isNotEmpty(sequenceNames)) {
            sequenceDtos = sequenceNames.stream()
                    .map(sequenceName -> metadata.getSequenceDto(schemaName, sequenceName))
                    .peek(sequenceDto -> {
                        sequenceDto.setTarSchema(tarSchemaName);
                        sequenceDto.setTask(task);
                        sequenceDto.setMigrationObjectType(MigrationObjectType.SEQUENCE);
                    })
                    .collect(Collectors.toList());
        }
        return sequenceDtos;
    }

    private List<ViewDto> viewDtos(Task task, SchemaConfig.SchemaMapper schemaMapper) {
        String schemaName = schemaMapper.getSrcSchemaName();
        String tarSchemaName = schemaMapper.getTarSchemaName();
        List<ViewDto> viewDtos = Lists.newArrayList();
        DataSourceDto dataSourceDto = dataSourceDao.get(task.getSrc());
        Metadata metadata = MetadataFactory.getMetaData(dataSourceDto);
        List<String> viewNames = metadata.getViewNames(schemaName);
        if (CollectionUtils.isNotEmpty(viewNames)) {
            viewDtos = viewNames.stream()
                    .map(viewName -> metadata.getViewDto(schemaName, viewName))
                    .peek(viewDto -> {
                        viewDto.setTarSchema(tarSchemaName);
                        viewDto.setTask(task);
                        viewDto.setMigrationObjectType(MigrationObjectType.VIEW);
                    })
                    .collect(Collectors.toList());

        }
        return viewDtos;
    }

    private List<FunctionDto> functinDtos(Task task, SchemaConfig.SchemaMapper schemaMapper) {
        String schemaName = schemaMapper.getSrcSchemaName();
        String tarSchemaName = schemaMapper.getTarSchemaName();
        List<FunctionDto> functionDtos = Lists.newArrayList();
        DataSourceDto dataSourceDto = dataSourceDao.get(task.getSrc());
        Metadata metadata = MetadataFactory.getMetaData(dataSourceDto);
        List<String> functionNames = metadata.getFunctionNames(schemaName);
        if (CollectionUtils.isNotEmpty(functionNames)) {
            functionDtos = functionNames.stream()
                    .map(functionName -> metadata.getFunctionDto(schemaName, functionName))
                    .peek(functionDto -> {
                        functionDto.setTarSchema(tarSchemaName);
                        functionDto.setTask(task);
                        functionDto.setMigrationObjectType(MigrationObjectType.FUNCTION);
                    })
                    .collect(Collectors.toList());
        }
        return functionDtos;
    }

    private List<TriggerDto> triggerDtos(Task task, SchemaConfig.SchemaMapper schemaMapper) {
        String schemaName = schemaMapper.getSrcSchemaName();
        String tarSchemaName = schemaMapper.getTarSchemaName();
        List<TriggerDto> triggerDtos = Lists.newArrayList();
        DataSourceDto dataSourceDto = dataSourceDao.get(task.getSrc());
        Metadata metadata = MetadataFactory.getMetaData(dataSourceDto);
        List<String> triggerNames = metadata.getTriggerNames(schemaName);
        if (CollectionUtils.isNotEmpty(triggerNames)) {
            triggerDtos = triggerNames.stream()
                    .map(triggerName -> metadata.getTriggerDto(schemaName, triggerName))
                    .peek(triggerDto -> {
                        triggerDto.setTarSchema(tarSchemaName);
                        triggerDto.setTask(task);
                        triggerDto.setMigrationObjectType(MigrationObjectType.TRIGGER);
                    })
                    .collect(Collectors.toList());
        }
        return triggerDtos;
    }

}
